package cn.mabach.portal.kafka;

import cn.mabach.portal.dianping.mongo.Recs;
import cn.mabach.portal.dianping.mongo.Res;
import cn.mabach.portal.kafka.mongoEntity.ShopAccess;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @Author: ming
 * @Date: 2020/3/26 0026 下午 4:27
 */
@Component
@Slf4j
public class Consumer {

    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private MongoTemplate mongoTemplate;




    @KafkaListener(topics = "topic4")
    public void listen (ConsumerRecord<?, ?> record) throws Exception {

        String s = record.value().toString();
        ArrayList<ShopAccess> shopAccesses = new ArrayList<>();

        if (s.contains("aclog_")){
            String[] sl = s.split("aclog_ ")[1].split(" ");
            String uid=sl[0];
            String access=sl[1];
            Integer shopId=Integer.valueOf(sl[2]);
            if (StringUtils.isEmpty(uid)){
                return;
            }

            Map<Integer,ShopAccess> o = (Map<Integer,ShopAccess>) redisTemplate.boundValueOps(uid).get();

//            如果是第一次进来 则创建行为bean
            if (o==null||o.size()==0){
                ShopAccess sc = new ShopAccess();

//                根据类型设置用户行为
                ShopAccess setuac = setuac(sc, access);
                Map<Integer,ShopAccess> oo=new HashMap<>();
                oo.put(shopId,setuac);

                redisTemplate.boundValueOps(uid).set(oo,30, TimeUnit.MINUTES);

            }


//若果uid已经存在，则对原有uid修改
            Map<Integer,ShopAccess> os = (Map<Integer,ShopAccess>) redisTemplate.boundValueOps(uid).get();


//            如果map -id集合中不包含传进来的 shopid,则新添加一个shopAccess
            if (!os.keySet().contains(shopId)){

                ShopAccess access1 = new ShopAccess();
                ShopAccess setuac = setuac(access1, access);
                os.put(shopId,setuac);

            }else {
                //            如果map -id集合中包含传进来的 shopid,则则对原有shopAccess进行修改
                ShopAccess shopAccess = os.get(shopId);
                ShopAccess setuac = setuac(shopAccess, access);
                os.put(shopId,setuac);

            }

            redisTemplate.boundValueOps(uid).set(os,30, TimeUnit.MINUTES);

//对map进行排序返回linkhashmap
            ArrayList<Map.Entry<Integer, ShopAccess>> entryArrayList = new ArrayList<>(os.entrySet());
            Collections.sort(entryArrayList, new Comparator<Map.Entry<Integer, ShopAccess>>() {
                @Override
                public int compare(Map.Entry<Integer, ShopAccess> o1, Map.Entry<Integer, ShopAccess> o2) {
                    if (o1.getValue().getDate()-o2.getValue().getDate()>0){
                        return -1;
                    }
                    if (o1.getValue().getDate()-o2.getValue().getDate()<0){
                        return 1;
                    }
                    return 0;
                }
            });
//            截取前20个评分过的shop
            List<Map.Entry<Integer, ShopAccess>> subList=entryArrayList;
        if (entryArrayList.size()>20){
            subList = entryArrayList.subList(0, 20);

        }

            LinkedHashMap<Integer, ShopAccess> newMap = new LinkedHashMap<>();
            for (Map.Entry<Integer, ShopAccess> shopAccessEntry : subList) {
                newMap.put(shopAccessEntry.getKey(),shopAccessEntry.getValue());
            }
//将前五个map重新存入redis
            redisTemplate.boundValueOps(uid).set(newMap,30, TimeUnit.MINUTES);

            log.info("整理后的评分过的shopmap长度为======>:{}",newMap.size());

//获取当前用户的所有shop加权相似物品
            HashMap<Integer, Double> shopWeight = new HashMap<>();
            for (Map.Entry<Integer, ShopAccess> shopAccessEntry : subList) {
                Integer sid = shopAccessEntry.getKey();

                Double sw = getShopWeight(shopAccessEntry.getValue(), sid);

                Res res = mongoTemplate.findById(sid, Res.class);
                List<Recs> subList1=res.getRecs();

//                截取前10个最相似物品
                if (res.getRecs().size()>10){
                    subList1 = res.getRecs().subList(0, 10);
                }

//                把所有相似物品全封装到新的map中shopWeight
                for (Recs rec : subList1) {
                    Integer rec_id = rec.get_id();
                    Double weight=rec.getScore()*sw;
//                    过滤已浏览过的shop，跳过该次循环
                    if (os.keySet().contains(rec_id)){

                        continue;
                    }

//                    遇到shopId相同的，为了保持取较大的那个,跳过该次循环
                    Double oldWeight = shopWeight.get(rec_id);
                    if (shopWeight!=null&&oldWeight!=null){
                        if (oldWeight>weight){

                            continue;

                        }
                    }
                    shopWeight.put(rec.get_id(),weight);
                }
            }


//对相似度物品map进行排序
            ArrayList<Map.Entry<Integer, Double>> ShopWeightList = new ArrayList<>(shopWeight.entrySet());
            Collections.sort(ShopWeightList, new Comparator<Map.Entry<Integer, Double>>() {
                @Override
                public int compare(Map.Entry<Integer, Double> o1, Map.Entry<Integer, Double> o2) {
                    if (o1.getValue()-o2.getValue()>0){
                        return -1;
                    }
                    if (o1.getValue()-o2.getValue()<0){
                        return 1;
                    }
                    return 0;
                }
            });
            List<Map.Entry<Integer, Double>> subList2=ShopWeightList;
//            取最相似的前100个物品
            if (ShopWeightList.size()>100){
                subList2 = ShopWeightList.subList(0, 100);
            }

            ArrayList<Integer> simshopIds = new ArrayList<>();
//            LinkedHashMap<Integer, Double> integerDoubleLinkedHashMap = new LinkedHashMap<>();
            for (Map.Entry<Integer, Double> entry : subList2) {
                simshopIds.add(entry.getKey());
//                integerDoubleLinkedHashMap.put(entry.getKey(),entry.getValue());

            }
//将uid对应的前100个最相似物品存到redis 格式 LinkedHashMap<Integer, Double>
            redisTemplate.boundValueOps("SIMILAR_"+uid).set(simshopIds,1,TimeUnit.DAYS);
            log.info("最相似的前{}个物品id集合",simshopIds.size());




        }

    }

    private Double getShopWeight(ShopAccess shopAccess,Integer id) {
        Integer click= shopAccess.getClick()==true?1: 0;
        Integer collect= shopAccess.getCollect()==true?2: 0;
        Integer share= shopAccess.getShare()==true?2: 0;
        Integer buy= shopAccess.getBuy()==true?3: 0;
        Integer pay= shopAccess.getPay()==true?5: 0;
        Long time=(System.currentTimeMillis()-shopAccess.getDate())/1000/60;
        Long hours=time%6;
        double l = Math.log(1 + 1 / (1+hours));
        Double weight= (click+collect+share+buy+pay)*l;


        return weight;
    }





    private ShopAccess setuac(ShopAccess shopAccess, String anObject) {
        shopAccess.setDate(System.currentTimeMillis());
        if ("click".equals(anObject)) {
            shopAccess.setClick(true);
        }

        if ("collect".equals(anObject)) {
            shopAccess.setCollect(true);
        }

        if ("share".equals(anObject)) {
            shopAccess.setShare(true);
        }

        if ("buy".equals(anObject)) {
            shopAccess.setBuy(true);
        }

        if ("pay".equals(anObject)) {
            shopAccess.setPay(true);
        }

        return shopAccess;
    }

}
